{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Migrating from Spark to BigQuery via Dataproc -- Part 3\n",
    "\n",
    "* [Part 1](01_spark.ipynb): The original Spark code, now running on Dataproc (lift-and-shift).\n",
    "* [Part 2](02_gcs.ipynb): Replace HDFS by Google Cloud Storage. This enables job-specific-clusters. (cloud-native)\n",
    "* [Part 3](03_automate.ipynb): Automate everything, so that we can run in a job-specific cluster. (cloud-optimized)\n",
    "* [Part 4](04_bigquery.ipynb): Load CSV into BigQuery, use BigQuery. (modernize)\n",
    "* [Part 5](05_functions.ipynb): Using Cloud Functions, launch analysis every time there is a new file in the bucket. (serverless)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Catch up: data to GCS"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Catch up cell. Run if you did not do previous notebooks of this sequence\n",
    "!wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz\n",
    "BUCKET='cloud-training-demos-ml'  # CHANGE\n",
    "!pip install google-compute-engine\n",
    "!gcloud storage cp kdd* gs://$BUCKET/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "BUCKET='cloud-training-demos-ml'  # CHANGE\n",
    "!gcloud storage ls gs://$BUCKET/kdd*"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create a Python file\n",
    "\n",
    "Put all the code in a Python file. We can comment out the display-only code such as ```take()``` and ```show()```\n",
    "Make changeable settings like ```BUCKET``` come from sys.args"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile spark_analysis.py\n",
    "\n",
    "import argparse\n",
    "parser = argparse.ArgumentParser()\n",
    "parser.add_argument(\"--bucket\", help=\"bucket for input and output\")\n",
    "args = parser.parse_args()\n",
    "\n",
    "BUCKET = args.bucket"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile -a spark_analysis.py\n",
    "\n",
    "from pyspark.sql import SparkSession, SQLContext, Row\n",
    "\n",
    "spark = SparkSession.builder.appName(\"kdd\").getOrCreate()\n",
    "sc = spark.sparkContext\n",
    "data_file = \"gs://{}/kddcup.data_10_percent.gz\".format(BUCKET)\n",
    "raw_rdd = sc.textFile(data_file).cache()\n",
    "#raw_rdd.take(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile -a spark_analysis.py\n",
    "\n",
    "csv_rdd = raw_rdd.map(lambda row: row.split(\",\"))\n",
    "parsed_rdd = csv_rdd.map(lambda r: Row(\n",
    "    duration=int(r[0]), \n",
    "    protocol_type=r[1],\n",
    "    service=r[2],\n",
    "    flag=r[3],\n",
    "    src_bytes=int(r[4]),\n",
    "    dst_bytes=int(r[5]),\n",
    "    wrong_fragment=int(r[7]),\n",
    "    urgent=int(r[8]),\n",
    "    hot=int(r[9]),\n",
    "    num_failed_logins=int(r[10]),\n",
    "    num_compromised=int(r[12]),\n",
    "    su_attempted=r[14],\n",
    "    num_root=int(r[15]),\n",
    "    num_file_creations=int(r[16]),\n",
    "    label=r[-1]\n",
    "    )\n",
    ")\n",
    "#parsed_rdd.take(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile -a spark_analysis.py\n",
    "\n",
    "sqlContext = SQLContext(sc)\n",
    "df = sqlContext.createDataFrame(parsed_rdd)\n",
    "connections_by_protocol = df.groupBy('protocol_type').count().orderBy('count', ascending=False)\n",
    "connections_by_protocol.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile -a spark_analysis.py\n",
    "\n",
    "df.registerTempTable(\"connections\")\n",
    "attack_stats = sqlContext.sql(\"\"\"\n",
    "                           SELECT \n",
    "                             protocol_type, \n",
    "                             CASE label\n",
    "                               WHEN 'normal.' THEN 'no attack'\n",
    "                               ELSE 'attack'\n",
    "                             END AS state,\n",
    "                             COUNT(*) as total_freq,\n",
    "                             ROUND(AVG(src_bytes), 2) as mean_src_bytes,\n",
    "                             ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,\n",
    "                             ROUND(AVG(duration), 2) as mean_duration,\n",
    "                             SUM(num_failed_logins) as total_failed_logins,\n",
    "                             SUM(num_compromised) as total_compromised,\n",
    "                             SUM(num_file_creations) as total_file_creations,\n",
    "                             SUM(su_attempted) as total_root_attempts,\n",
    "                             SUM(num_root) as total_root_acceses\n",
    "                           FROM connections\n",
    "                           GROUP BY protocol_type, state\n",
    "                           ORDER BY 3 DESC\n",
    "                           \"\"\")\n",
    "attack_stats.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "%%writefile -a spark_analysis.py\n",
    "\n",
    "ax = attack_stats.toPandas().plot.bar(x='protocol_type', subplots=True, figsize=(10,25))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Write out report\n",
    "\n",
    "Make sure to copy the output to GCS so that we can safely delete the cluster. This has to be pure Python, so replace shell commands by equivalent Python code."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile -a spark_analysis.py\n",
    "\n",
    "ax[0].get_figure().savefig('report.png');\n",
    "#!gcloud storage rm --recursive --continue-on-error gs://$BUCKET/sparktobq/\n",
    "#!gcloud storage cp report.png gs://$BUCKET/sparktobq/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile -a spark_analysis.py\n",
    "\n",
    "import google.cloud.storage as gcs\n",
    "bucket = gcs.Client().get_bucket(BUCKET)\n",
    "for blob in bucket.list_blobs(prefix='sparktobq/'):\n",
    "    blob.delete()\n",
    "bucket.blob('sparktobq/report.png').upload_from_filename('report.png')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile -a spark_analysis.py\n",
    "\n",
    "connections_by_protocol.write.format(\"csv\").mode(\"overwrite\").save(\n",
    "    \"gs://{}/sparktobq/connections_by_protocol\".format(BUCKET))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Test automation\n",
    "\n",
    "Run it standalone"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "BUCKET='cloud-training-demos-ml'  # CHANGE\n",
    "print('Writing to {}'.format(BUCKET))\n",
    "!python spark_analysis.py --bucket=$BUCKET"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud storage ls gs://$BUCKET/sparktobq/**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
